package com.superid.schema;

import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 由于发送到kafka上的消息需要用Avro压缩解压缩，因此，需要用Schema信息
 * 使用范例如下：
 *         Schema parsedSchema = new MySchema.Builder()
 *                        .type(MySchema.RECORD_TYPE)
 *                        .name("user_option8")
 *                        .field("allianceId","long")
 *                        .field("affairId","long")
 *                        .field("userId","int")
 *                        .field("opType","string")
 *                        .field("beOperatedRoleId","string")
 *                        .field("attrs","{\"type\": \"map\", \"values\":\"string\"}")
 *                        .build()
 *                        .getParsedSchema();
 *
 *        org.apache.avro.generic.GenericRecord avroRecord = new org.apache.avro.generic.GenericData.Record(parsedSchema);
 *
 *        Map<String, Object> map = new HashMap<>();
 *        map.put("name", "pilaf");
 *        map.put("age", "26");
 *        //put msg key-value here ，for example
 *        avroRecord.put("allianceId", 11L);
 *        avroRecord.put("opType","c");
 *        avroRecord.put("attrs",map);
 *        //发送的ProducerRecord中要包着avroRecord
 *        kafkaProducer.send(new ProducerRecord("kafka_topic_here", kafka_key , avroRecord));
 *
 * @author dufeng
 * @create: 2018-08-06 13:55
 */
public class MySchema {
    /**
     * 类型是record，将用于Hive表，利用kafka hdfs connector可以直接导入数据到hive表中，用于查询
     */
    public static final String RECORD_TYPE = "record";

    /**
     * schema的type类型，默认用record
     */
    private String type = MySchema.RECORD_TYPE;
    /**
     * schema的名字信息
     */
    private String name;
    /**
     * schema中的字段
     */
    private List<Map<String, String>> fields = new ArrayList<>();

    /**
     * 不对外提供公开构造器，让使用者通过builder构造
     */
    private MySchema() {

    }


    private MySchema(Builder builder) {
        type = builder.type;
        name = builder.name;
        fields = builder.fields;
    }


    public static final class Builder {
        private String type;
        private String name;
        private List<Map<String, String>> fields = new ArrayList<>();

        public Builder() {
        }

        public Builder type(String val) {
            this.type = val;
            return this;
        }

        public Builder name(String val) {
            this.name = val;
            return this;
        }

        public Builder fields(List<Map<String, String>> fieldMap) {
            this.fields = fieldMap;
            return this;
        }

        public Builder field(String name, String type) {
            if (type != null) {
                type = type.toLowerCase();
            }

            Map<String, String> aFieldMap = new HashMap<>();
            aFieldMap.put("name", name);
            aFieldMap.put("type", type);
            fields.add(aFieldMap);

            return this;
        }

        public Builder field(Map<String, String> map) {
            fields.add(map);
            return this;
        }

        public MySchema build() {
            return new MySchema(this);
        }
    }


    @Override
    public String toString() {
        StringBuilder fieldStr = new StringBuilder("[");
        for (Map<String, String> map : fields) {
            fieldStr.append("{");
            fieldStr.append("\"name\":").append("\"").append(map.get("name")).append("\",");
            if (map.get("type").startsWith("{")) {
                fieldStr.append("\"type\":").append(map.get("type"));
            } else {
                fieldStr.append("\"type\":").append("\"").append(map.get("type")).append("\"");
            }

            fieldStr.append("}");
            fieldStr.append(",");
        }

        //删除最后一个多余的逗号
        fieldStr.deleteCharAt(fieldStr.length() - 1);

        fieldStr.append("]");

        return "{" +
                "\"type\":\"" + type + '\"' +
                ",\"name\":\"" + name + '\"' +
                ",\"fields\":" + fieldStr.toString() +
                '}';
    }

    /**
     * 获取解析后的Schema信息
     *
     * @return
     */
    public Schema getParsedSchema() {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(this.toString());

        return schema;
    }
}
